package com.my.service.task;

import com.my.service.task.map.CarrierMap;
import com.my.service.task.reduce.CarrierReducer;
import com.my.service.task.util.MongoUtils;
import com.my.service.task.entity.CarrierInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.bson.Document;

import java.util.List;

public class CarrierTask {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        DataSet<String> text = env.readTextFile(params.get("input"));
        DataSet<CarrierInfo> mapResult = text.map(new CarrierMap());
        DataSet<CarrierInfo> reduceResult = mapResult.groupBy("groupfield").reduce(new CarrierReducer());
        List<CarrierInfo> resultList = reduceResult.collect();
        for (CarrierInfo cInfo : resultList) {
            String carrier = cInfo.getCarrier();
            Long count = cInfo.getCount();
            Document res = MongoUtils.findOneBy("carrierstatics",
                    "realtimeportrait",
                    carrier
            );
            if (res == null) {
                res = new Document();
                res.put("info", carrier);
                res.put("count", count);
            } else {
                Long countPre = res.getLong("count");
                Long total = countPre + count;
                res.put("count", total);
            }
            MongoUtils.saveOrUpdateMongo(
                    "carrierstatics",
                    "realtimeportrait",
                    res
            );
        }
        env.execute("carrier analysis");
    }
}
